/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.execute;

import static org.apache.phoenix.util.ScanUtil.isPacingScannersPossible;
import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.execute.visitor.ByteCountVisitor;
import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.BaseResultIterators;
import org.apache.phoenix.iterate.ChunkedResultIterator;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LimitingResultIterator;
import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator;
import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
import org.apache.phoenix.iterate.OffsetResultIterator;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelIterators;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.base.Optional;

/**
 * Query plan for a basic table scan
 * @since 0.1
 */
public class ScanPlan extends BaseQueryPlan {
  private static final Logger LOGGER = LoggerFactory.getLogger(ScanPlan.class);
  private List<KeyRange> splits;
  private List<List<Scan>> scans;
  private boolean allowPageFilter;
  private boolean isSerial;
  private boolean isDataToScanWithinThreshold;
  private Long serialRowsEstimate;
  private Long serialBytesEstimate;
  private Long serialEstimateInfoTs;
  private OrderBy actualOutputOrderBy;
  private Optional<byte[]> rowOffset;

  public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table,
    RowProjector projector, Integer limit, Integer offset, OrderBy orderBy,
    ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, QueryPlan dataPlan,
    Optional<byte[]> rowOffset) throws SQLException {
    super(context, statement, table, projector, context.getBindManager().getParameterMetaData(),
      limit, offset, orderBy, GroupBy.EMPTY_GROUP_BY,
      parallelIteratorFactory != null
        ? parallelIteratorFactory
        : buildResultIteratorFactory(context, statement, table, orderBy, limit, offset,
          allowPageFilter),
      dataPlan);
    this.allowPageFilter = allowPageFilter;
    boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
    if (isOrdered) { // TopN
      serializeScanRegionObserverIntoScan(context.getScan(),
        limit == null ? -1 : QueryUtil.getOffsetLimit(limit, offset),
        orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize());
      ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION);
    }
    Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
    perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
    Pair<Long, Long> estimate =
      getEstimateOfDataSizeToScanIfWithinThreshold(context, table.getTable(), perScanLimit);
    this.isDataToScanWithinThreshold = estimate != null;
    this.isSerial = isSerial(context, statement, tableRef, orderBy, isDataToScanWithinThreshold);
    if (isSerial) {
      serialBytesEstimate = estimate.getFirst();
      serialRowsEstimate = estimate.getSecond();
      serialEstimateInfoTs = StatisticsUtil.NOT_STATS_BASED_TS;
    }
    this.actualOutputOrderBy = convertActualOutputOrderBy(orderBy, context);
    this.rowOffset = rowOffset;
  }

  // Static because called from tests
  public static void serializeScanRegionObserverIntoScan(Scan scan, int limit,
    List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
    ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size?
    try {
      DataOutputStream output = new DataOutputStream(stream);
      WritableUtils.writeVInt(output, limit);
      WritableUtils.writeVInt(output, estimatedRowSize);
      WritableUtils.writeVInt(output, orderByExpressions.size());
      for (OrderByExpression orderingCol : orderByExpressions) {
        orderingCol.write(output);
      }
      scan.setAttribute(BaseScannerRegionObserverConstants.TOPN, stream.toByteArray());
    } catch (IOException e) {
      throw new RuntimeException(e);
    } finally {
      try {
        stream.close();
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
  }

  private static boolean isSerial(StatementContext context, FilterableStatement statement,
    TableRef tableRef, OrderBy orderBy, boolean isDataWithinThreshold) throws SQLException {
    if (isDataWithinThreshold) {
      PTable table = tableRef.getTable();
      boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
      boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context);
      if (!canBeExecutedSerially) {
        if (hasSerialHint) {
          LOGGER.warn("This query cannot be executed serially. Ignoring the hint");
        }
        return false;
      }
      return true;
    }
    return false;
  }

  /**
   * @return Pair of numbers in which the first part is estimated number of bytes that will be
   *         scanned and the second part is estimated number of rows. Returned value is null if
   *         estimated size of data to scan is beyond a threshold.
   */
  private static Pair<Long, Long> getEstimateOfDataSizeToScanIfWithinThreshold(
    StatementContext context, PTable table, Integer perScanLimit) throws SQLException {
    Scan scan = context.getScan();
    ConnectionQueryServices services = context.getConnection().getQueryServices();
    long estRowSize = SchemaUtil.estimateRowSize(table);
    long regionSize = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE,
      HConstants.DEFAULT_MAX_FILE_SIZE);
    if (perScanLimit == null || scan.getFilter() != null) {
      /*
       * If a limit is not provided or if we have a filter, then we are not able to decide whether
       * the amount of data we need to scan is less than the threshold.
       */
      return null;
    }
    float factor = services.getProps().getFloat(QueryServices.LIMITED_QUERY_SERIAL_THRESHOLD,
      QueryServicesOptions.DEFAULT_LIMITED_QUERY_SERIAL_THRESHOLD);
    long threshold = (long) (factor * regionSize);
    long estimatedBytes = perScanLimit * estRowSize;
    long estimatedRows = perScanLimit;
    return (perScanLimit * estRowSize < threshold)
      ? new Pair<>(estimatedBytes, estimatedRows)
      : null;
  }

  @SuppressWarnings("deprecation")
  private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
    FilterableStatement statement, TableRef tableRef, OrderBy orderBy, Integer limit,
    Integer offset, boolean allowPageFilter) throws SQLException {

    if (
      (isSerial(context, statement, tableRef, orderBy,
        getEstimateOfDataSizeToScanIfWithinThreshold(context, tableRef.getTable(),
          QueryUtil.getOffsetLimit(limit, offset)) != null)
        || isRoundRobinPossible(orderBy, context) || isPacingScannersPossible(context))
    ) {
      return ParallelIteratorFactory.NOOP_FACTORY;
    }
    ParallelIteratorFactory spoolingResultIteratorFactory =
      new SpoolingResultIterator.SpoolingResultIteratorFactory(
        context.getConnection().getQueryServices());

    // If we're doing an order by then we need the full result before we can do anything,
    // so we don't bother chunking it. If we're just doing a simple scan then we chunk
    // the scan to have a quicker initial response.
    if (!orderBy.getOrderByExpressions().isEmpty()) {
      return spoolingResultIteratorFactory;
    } else {
      return new ChunkedResultIterator.ChunkedResultIteratorFactory(spoolingResultIteratorFactory,
        context.getConnection().getMutationState(), tableRef);
    }
  }

  @Override
  public Cost getCost() {
    Long byteCount = null;
    try {
      byteCount = getEstimatedBytesToScan();
    } catch (SQLException e) {
      // ignored.
    }
    Double outputBytes = this.accept(new ByteCountVisitor());

    if (byteCount == null || outputBytes == null) {
      return Cost.UNKNOWN;
    }

    int parallelLevel =
      CostUtil.estimateParallelLevel(true, context.getConnection().getQueryServices());
    Cost cost = new Cost(0, 0, byteCount);
    if (!orderBy.getOrderByExpressions().isEmpty()) {
      Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, outputBytes, parallelLevel);
      cost = cost.plus(orderByCost);
    }
    return cost;
  }

  @Override
  public List<KeyRange> getSplits() {
    if (splits == null) return Collections.emptyList();
    else return splits;
  }

  @Override
  public List<List<Scan>> getScans() {
    if (scans == null) return Collections.emptyList();
    else return scans;
  }

  private static boolean isOffsetPossibleOnServer(StatementContext context, OrderBy orderBy,
    Integer offset, boolean isSalted, IndexType indexType) {
    return offset != null && orderBy.getOrderByExpressions().isEmpty()
      && !((isSalted || indexType == IndexType.LOCAL)
        && ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context));
  }

  @Override
  protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan,
    Map<ImmutableBytesPtr, ServerCache> caches) throws SQLException {
    // Set any scan attributes before creating the scanner, as it will be too late afterwards
    scan.setAttribute(BaseScannerRegionObserverConstants.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
    ResultIterator scanner;
    TableRef tableRef = this.getTableRef();
    PTable table = tableRef.getTable();
    boolean isSalted = table.getBucketNum() != null;
    /*
     * If no limit or topN, use parallel iterator so that we get results faster. Otherwise, if limit
     * is provided, run query serially.
     */
    boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
    Integer perScanLimit =
      !allowPageFilter || isOrdered ? null : QueryUtil.getOffsetLimit(limit, offset);
    boolean isOffsetOnServer =
      isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType());
    /*
     * For queries that are doing a row key order by and are not possibly querying more than a
     * threshold worth of data, then we only need to initialize scanners corresponding to the first
     * (or last, if reverse) scan per region.
     */
    boolean initFirstScanOnly =
      (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)
        && isDataToScanWithinThreshold;
    BaseResultIterators iterators;
    if (isOffsetOnServer) {
      iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory,
        scanGrouper, scan, caches, dataPlan);
    } else if (isSerial) {
      iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory,
        scanGrouper, scan, caches, dataPlan);
    } else {
      iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper,
        scan, initFirstScanOnly, caches, dataPlan);
    }
    estimatedRows = iterators.getEstimatedRowCount();
    estimatedSize = iterators.getEstimatedByteCount();
    estimateInfoTimestamp = iterators.getEstimateInfoTimestamp();
    splits = iterators.getSplits();
    scans = iterators.getScans();
    if (isOffsetOnServer) {
      scanner = new ConcatResultIterator(iterators);
      if (limit != null) {
        scanner = new LimitingResultIterator(scanner, limit);
      }
    } else if (isOrdered) {
      scanner =
        new MergeSortTopNResultIterator(iterators, limit, offset, orderBy.getOrderByExpressions());
    } else {
      if (
        (isSalted || table.getIndexType() == IndexType.LOCAL)
          && ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context)
      ) {
        /*
         * For salted tables or local index, a merge sort is needed if: 1) The config
         * phoenix.query.force.rowkeyorder is set to true 2) Or if the query has an order by that
         * wants to sort the results by the row key (forward or reverse ordering)
         */
        scanner = new MergeSortRowKeyResultIterator(iterators,
          isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY);
      } else if (useRoundRobinIterator()) {
        /*
         * For any kind of tables, round robin is possible if there is no ordering of rows needed.
         */
        scanner = new RoundRobinResultIterator(iterators, this);
      } else {
        scanner = new ConcatResultIterator(iterators);
      }
      if (offset != null) {
        scanner = new OffsetResultIterator(scanner, offset);
      }
      if (limit != null) {
        scanner = new LimitingResultIterator(scanner, limit);
      }
    }

    if (context.getSequenceManager().getSequenceCount() > 0) {
      scanner = new SequenceResultIterator(scanner, context.getSequenceManager());
    }
    return scanner;
  }

  @Override
  public boolean useRoundRobinIterator() throws SQLException {
    return ScanUtil.isRoundRobinPossible(orderBy, context);
  }

  @Override
  public <T> T accept(QueryPlanVisitor<T> visitor) {
    return visitor.visit(this);
  }

  @Override
  public Long getEstimatedRowsToScan() throws SQLException {
    if (isSerial) {
      return serialRowsEstimate;
    }
    return super.getEstimatedRowsToScan();
  }

  @Override
  public Long getEstimatedBytesToScan() throws SQLException {
    if (isSerial) {
      return serialBytesEstimate;
    }
    return super.getEstimatedBytesToScan();
  }

  @Override
  public Long getEstimateInfoTimestamp() throws SQLException {
    if (isSerial) {
      return serialEstimateInfoTs;
    }
    return super.getEstimateInfoTimestamp();
  }

  private static OrderBy convertActualOutputOrderBy(OrderBy orderBy,
    StatementContext statementContext) throws SQLException {
    if (!orderBy.isEmpty()) {
      return OrderBy.convertCompiledOrderByToOutputOrderBy(orderBy);
    }

    if (!ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, statementContext)) {
      return OrderBy.EMPTY_ORDER_BY;
    }

    TableRef tableRef = statementContext.getResolver().getTables().get(0);
    return ExpressionUtil.getOrderByFromTable(tableRef, statementContext.getConnection(),
      orderBy == OrderBy.REV_ROW_KEY_ORDER_BY).getFirst();
  }

  @Override
  public List<OrderBy> getOutputOrderBys() {
    return OrderBy.wrapForOutputOrderBys(this.actualOutputOrderBy);
  }

  public Optional<byte[]> getRowOffset() {
    return this.rowOffset;
  }
}
